/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.io.kafka;



import com.bff.gaia.unified.sdk.transforms.SerializableFunction;

import com.bff.gaia.unified.sdk.transforms.windowing.BoundedWindow;

import com.bff.gaia.unified.vendor.guava.com.google.common.annotations.VisibleForTesting;

import org.apache.kafka.common.TopicPartition;

import org.joda.time.Duration;

import org.joda.time.Instant;





import java.util.Optional;



/**

 * A policy for custom record timestamps where timestamps within a partition are expected to be

 * roughly monotonically increasing with a cap on out of order event delays (say 1 minute). The

 * watermark at any time is '({@code Min(now(), Max(event timestamp so far)) - max delay})'.

 * However, watermark is never set in future and capped to 'now - max delay'. In addition, watermark

 * advanced to 'now - max delay' when a partition is idle.

 */

public class CustomTimestampPolicyWithLimitedDelay<K, V> extends TimestampPolicy<K, V> {



  private final Duration maxDelay;

  private final SerializableFunction<KafkaRecord<K, V>, Instant> timestampFunction;

  private Instant maxEventTimestamp;



  /**

   * A policy for custom record timestamps where timestamps are expected to be roughly monotonically

   * increasing with out of order event delays less than {@code maxDelay}. The watermark at any time

   * is {@code Min(now(), max_event_timestamp) - maxDelay}.

   *

   * @param timestampFunction A function to extract timestamp from the record

   * @param maxDelay For any record in the Kafka partition, the timestamp of any subsequent record

   *     is expected to be after {@code current record timestamp - maxDelay}.

   * @param previousWatermark Latest check-pointed watermark, see {@link

   *     TimestampPolicyFactory#createTimestampPolicy(TopicPartition, Optional)}

   */

  public CustomTimestampPolicyWithLimitedDelay(

      SerializableFunction<KafkaRecord<K, V>, Instant> timestampFunction,

      Duration maxDelay,

      Optional<Instant> previousWatermark) {

    this.maxDelay = maxDelay;

    this.timestampFunction = timestampFunction;



    // 'previousWatermark' is not the same as maxEventTimestamp (e.g. it could have been in future).

    // Initialize it such that watermark before reading any event same as previousWatermark.

    maxEventTimestamp = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE).plus(maxDelay);

  }



  @Override

  public Instant getTimestampForRecord(PartitionContext ctx, KafkaRecord<K, V> record) {

    Instant ts = timestampFunction.apply(record);

    if (ts.isAfter(maxEventTimestamp)) {

      maxEventTimestamp = ts;

    }

    return ts;

  }



  @Override

  public Instant getWatermark(PartitionContext ctx) {

    // Watermark == maxEventTime - maxDelay, except in two special cases:

    //   a) maxEventTime in future : probably due to incorrect timestamps. Cap it to 'now'.

    //   b) partition is idle : Need to advance watermark if there are no records in the partition.

    //         We assume that future records will have timestamp >= 'now - maxDelay' and advance

    //         the watermark accordingly.

    // The above handles majority of common use cases for custom timestamps. Users can implement

    // their own policy if this does not work.



    Instant now = Instant.now();

    return getWatermark(ctx, now);

  }



  @VisibleForTesting

  Instant getWatermark(PartitionContext ctx, Instant now) {

    if (maxEventTimestamp.isAfter(now)) {

      return now.minus(maxDelay); // (a) above.

    } else if (ctx.getMessageBacklog() == 0

        && ctx.getBacklogCheckTime().minus(maxDelay).isAfter(maxEventTimestamp) // Idle

        && maxEventTimestamp.getMillis() > 0) { // Read at least one record with positive timestamp.

      return ctx.getBacklogCheckTime().minus(maxDelay);

    } else {

      return maxEventTimestamp.minus(maxDelay);

    }

  }

}